跳到主要内容

RocketMQ 学习02 消息收发基本使用

配置环境

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>

消息发送

消息发送者步骤分析

  1. 创建消息生产者 Producer,并制定生产者组名
  2. 指定 Nameserver 地址
  3. 启动 Producer
  4. 创建消息对象,指定主题 Topic、Tag 和消息体
  5. 发送消息
  6. 关闭生产者 Producer

发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

public class SyncProducer {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者 producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2. 指定 Nameserver 地址
producer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
//3. 启动 producer
producer.start();

for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/*
参数一:消息主题 Topic
参数二:消息 Tag
参数三:消息内容
*/
Message msg = new Message("MyTopic01", "Tag1", ("Hello World" + i).getBytes());
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送状态:" + status);
System.out.println("发送结果:" + result);

//线程睡1秒
// TimeUnit.SECONDS.sleep(1);
}

//6.关闭生产者producer
producer.shutdown();
}
}

等执行完成后就能在控制台看到发送过来的数据了(注意,它不是实时的,可能要等上一两分钟)

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。

public class AsyncProducer {

public static void main(String[] args) throws Exception {
// 1. 创建消息生产者 producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2. 指定 Nameserver 地址
producer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
// 3. 启动 producer
producer.start();
// 因为是异步的,所以不能在请求执行前结束(调用 shutdown),所以这里使用 CountDownLatch 来做栅栏
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
// 4. 创建消息对象,指定主题 Topic、Tag 和消息体
/*
参数一:消息主题Topic
参数二:消息Tag
参数三:消息内容
*/
Message msg = new Message("MyTopic02", "Tag2", ("Hello World" + i).getBytes());
// 5. 发送异步消息
producer.send(msg, new SendCallback() {
/**
* 发送成功回调函数
*/
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果:" + sendResult);
latch.countDown();
}

/**
* 发送失败回调函数
*/
public void onException(Throwable e) {
System.out.println("发送异常:" + e);
latch.countDown();
}
});

//线程睡1秒
// TimeUnit.SECONDS.sleep(1);
}

System.out.println("测试是否会在上面的请求之间执行(如果是则是异步的)");
latch.await();
// 6. 关闭生产者 producer
producer.shutdown();
}
}

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

producer 向 broker 发送消息,执行 API 时直接返回,不等待 broker 服务器的结果。

public class OneWayProducer {

public static void main(String[] args) throws Exception {
//1.创建消息生产者 producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定 Nameserver 地址
producer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
//3.启动 producer
producer.start();

for (int i = 0; i < 3; i++) {
//4.创建消息对象,指定主题 Topic、Tag 和消息体
/*
参数一:消息主题 Topic
参数二:消息Tag
参数三:消息内容
*/
Message msg = new Message("MyTopic03", "Tag3", ("Hello World,单向消息" + i).getBytes());
//5.发送单向消息
producer.sendOneway(msg);

//线程睡 1 秒
// TimeUnit.SECONDS.sleep(5);
}

//6.关闭生产者 producer
producer.shutdown();
}
}

消息消费

消息消费者步骤分析

  1. 创建消费者 Consumer,制定消费者组名
  2. 指定 Nameserver 地址
  3. 订阅主题 Topic 和 Tag
  4. 设置回调函数,处理消息
  5. 启动消费者 Consumer

负载均衡模式(默认)

注意:这里的消费者是监听生产者的消息的,所以可以直接挂着等生产者发送消息

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。

public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建消费者 Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2.指定 Nameserver 地址
consumer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
// 3.订阅主题 Topic 和 Tag
consumer.subscribe("MyTopic01", "Tag1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 消费消息
* @param list 取得的消息,注意,它不是一个一个取得消息的,它一次监听取到的消息可能有多个
* @param consumeConcurrentlyContext 消费者并发消费上下文
* @return 当前消息被消费的状态
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(System.out::println);
System.out.println("===================================");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者 consumer
consumer.start();
}
}

可以看到是负载均衡的

广播模式

指的是不同的 Comsumer 接收到的消息是一样的,适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。如下图所示:

通知全部消费者都收到这条消息:

public class Consumer {

public static void main(String[] args) throws Exception {
// 1. 创建消费者 Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2. 指定 Nameserver 地址
consumer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");
// 3. 订阅主题 Topic 和 Tag
consumer.subscribe("MyTopic01", "Tag2");
consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5. 启动消费者 consumer
consumer.start();
}
}

广播消费模式下不支持顺序消息。

广播模式下,消息队列 RocketMQ 保证 每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。

广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。